home *** CD-ROM | disk | FTP | other *** search
/ Language/OS - Multiplatform Resource Library / LANGUAGE OS.iso / p4 / p4-1_2c.lha / p4-1.2c / lib / p4_bm.c < prev    next >
C/C++ Source or Header  |  1993-06-14  |  14KB  |  478 lines

  1. #include "p4.h"
  2. #include "p4_sys.h"
  3.  
  4. int bm_start(argc, argv)
  5. int *argc;
  6. char **argv;
  7. {
  8.     int bm_switch_port;
  9.  
  10.     sprintf(whoami_p4, "p0_%d", getpid());
  11.     p4_dprintfl(90,"entering bm_start\n");
  12.  
  13.     trap_sig_errs();        /* Errors can happen any time */
  14.  
  15.     logging_flag = FALSE;
  16.     globmemsize = GLOBMEMSIZE;
  17.     sserver_port = 753;
  18.  
  19.     process_args(argc, argv);
  20.  
  21. #   ifdef SYSV_IPC
  22.     sysv_num_shmids = 0;
  23.     sysv_shmid[0]  = -1;
  24.     sysv_semid0    = -1;
  25.     sysv_semid0 = init_sysv_semset(0);
  26. #   endif
  27.  
  28.     MD_initmem(globmemsize);
  29.     alloc_global();  /* sets p4_global */
  30.  
  31.     if (*bm_outfile)
  32.     {
  33.     freopen(bm_outfile, "w", stdout);
  34.     freopen(bm_outfile, "w", stderr);
  35.     }
  36.  
  37.     p4_local = alloc_local_bm();
  38.     if (p4_local == NULL)
  39.     p4_error("p4_initenv: alloc_local_bm failed\n", NULL);
  40.  
  41.     MD_initenv();
  42.     bm_switch_port = getswport(p4_global->my_host_name);
  43.     usc_init();
  44.  
  45.     /* big master installing himself */
  46.     install_in_proctable(0, (-1), getpid(), p4_global->my_host_name, 
  47.              0, P4_MACHINE_TYPE, bm_switch_port);
  48.  
  49.     p4_local->my_id = 0;
  50.  
  51.     if (logging_flag)
  52.     ALOG_ENABLE;
  53.     else
  54.     ALOG_DISABLE;
  55.  
  56.     return (0);
  57. }
  58.  
  59. int p4_create_procgroup()
  60. {
  61.  
  62.     p4_dprintfl(90,"entering p4_create_procgroup\n");
  63.     if (p4_local->my_id != 0)
  64.     return(0);
  65.     if ((p4_local->procgroup = read_procgroup()) == NULL)
  66.     return (-1);
  67.     p4_startup(p4_local->procgroup);
  68.     return(0);
  69. }
  70.  
  71.  
  72. int p4_startup(pg)
  73. struct p4_procgroup *pg;
  74. {
  75.     int i, nslaves, unused_flag;
  76.     int listener_port, listener_fd;
  77.     struct bm_rm_msg bm_msg;
  78.     struct p4_procgroup_entry *local_pg;
  79.  
  80.     p4_dprintfl(90,"entering p4_startup\n");
  81.  
  82.     if (p4_global == NULL)
  83.     p4_error("p4 not initialized; perhaps p4_initenv not called",0);
  84.  
  85.     procgroup_to_proctable(pg);
  86.     if (pg->num_entries > 1)
  87.     p4_global->local_communication_only = FALSE;
  88.  
  89. #   ifdef CAN_DO_SOCKET_MSGS
  90.     if (!p4_global->local_communication_only)
  91.     {
  92.     net_setup_anon_listener(10, &listener_port, &listener_fd);
  93.     p4_global->listener_port = listener_port;
  94.     p4_global->listener_fd = listener_fd;
  95.     p4_dprintfl(90, "setup listener on port %d fd %d\n",
  96.             listener_port, listener_fd);
  97.     p4_global->proctable[0].port = listener_port;
  98.     SIGNAL_P4(LISTENER_ATTN_SIGNAL, handle_connection_interrupt);
  99.     }
  100. #   endif
  101.  
  102.     setup_conntab();
  103.  
  104.     p4_lock(&p4_global->slave_lock);
  105.     if ((nslaves = create_bm_processes(pg)) < 0)
  106.     return (-1);
  107.     if (!p4_am_i_cluster_master())  /* I was forked in create_bm_processes */
  108.     return(0);
  109.  
  110. #   ifdef CAN_DO_SOCKET_MSGS
  111.     if (create_remote_processes(pg) < 0)
  112.     return (-1);
  113. #   endif
  114.  
  115.     /* let local slaves use proc table to identify themselves */
  116.     p4_unlock(&p4_global->slave_lock); 
  117.  
  118.     send_proc_table();  /* to remote masters */
  119.  
  120. #   if defined(IPSC860)  ||  defined(CM5)  ||  defined(NCUBE)
  121.     /* send initial info and proctable to local slaves */
  122.     /* must use p4_i_to_n procs because node slave 
  123.        does not know if the msg is forwarded from bm */
  124.     local_pg = &(pg->entries[0]);
  125.     bm_msg.type = p4_i_to_n(INITIAL_INFO);
  126.     bm_msg.numinproctab = p4_i_to_n(p4_global->num_in_proctable);
  127.     bm_msg.numslaves = p4_i_to_n(local_pg->numslaves_in_group);
  128.     bm_msg.debug_level = p4_i_to_n(remote_debug_level);
  129.     bm_msg.memsize = p4_i_to_n(globmemsize);
  130.     bm_msg.logging_flag = p4_i_to_n(logging_flag);
  131.     strcpy(bm_msg.application_id, p4_global->application_id);
  132.     strcpy(bm_msg.version, p4_version());
  133.     strcpy(bm_msg.pgm, local_pg->slave_full_pathname);
  134.     for (i = 1; i <= nslaves; i++)
  135.     {
  136.     p4_dprintfl(90,"sending initinfo to slave %d of %d\n",i,nslaves);
  137. #       if defined(IPSC860)
  138.     csend((long) INITIAL_INFO, &bm_msg, (long) sizeof(struct bm_rm_msg), 
  139.           (long) i, (long) NODE_PID);
  140.     csend((long) INITIAL_INFO, p4_global->proctable, 
  141.           (long) sizeof(p4_global->proctable), (long) i, (long) NODE_PID);
  142. #       endif
  143. #       if defined(CM5)
  144.     CMMD_send_noblock(i, INITIAL_INFO, &bm_msg, sizeof(struct bm_rm_msg));
  145.     CMMD_send_noblock(i, INITIAL_INFO, p4_global->proctable, sizeof(p4_global->proctable));
  146. #       endif
  147. #       if defined(NCUBE)
  148.     nwrite(&bm_msg, sizeof(struct bm_rm_msg), i, INITIAL_INFO, &unused_flag);
  149.     nwrite(p4_global->proctable, sizeof(p4_global->proctable), i, INITIAL_INFO, &unused_flag);
  150. #       endif
  151.     p4_dprintfl(90,"sent initinfo to slave %d of %d\n",i,nslaves);
  152.     }
  153. #   endif
  154.  
  155.     p4_global->low_cluster_id = 
  156.     p4_local->my_id - p4_global->proctable[p4_local->my_id].slave_idx;
  157.     p4_global->hi_cluster_id = 
  158.     p4_global->low_cluster_id + p4_global->local_slave_count + 1;
  159.  
  160.     /* 
  161.        sync with local slaves thus insuring that they have the proctable before 
  162.        syncing with remotes (this keeps remotes from interrupting the local 
  163.        processes too early; then re-sync with local slaves (thus permitting them
  164.        to interrupt remotes)
  165.     */
  166.  
  167.     p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids());
  168.     /* 
  169.        NEED A SYNC WITH LOCALS THAT DOES A BARRIER WITH PROCS THAT SHARE
  170.        MEMORY AND MP BARRIER WITH OTHER "LOCAL" PROCESSES 
  171.     */
  172.     sync_with_remotes();
  173.     p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids());
  174.  
  175.  
  176.     return (0);
  177. }
  178.  
  179. int create_bm_processes(pg)
  180. struct p4_procgroup *pg;
  181. {
  182.     struct p4_procgroup_entry *local_pg;
  183.     struct listener_data *l;
  184.     int i, nslaves, end_1, end_2;
  185.     int slave_pid, listener_pid;
  186.     int slave_idx, listener_fd;
  187.     int port, switch_port, from, type, unused_flag;
  188.     struct bm_rm_msg bm_msg;
  189.  
  190.     p4_dprintfl(90,"entering create_bm_processes\n");
  191.     local_pg = &(pg->entries[0]);
  192.  
  193.     nslaves = local_pg->numslaves_in_group;
  194. #   if !defined(IPSC860)  &&  !defined(CM5)  &&  !defined(NCUBE)
  195.     if (nslaves > P4_MAX_MSG_QUEUES)
  196.     p4_error("more slaves than msg queues \n", nslaves);
  197. #   endif
  198.  
  199. /* alloc listener local data since this proc will eventually become listener */
  200. #   ifdef CAN_DO_SOCKET_MSGS
  201.     if (!(p4_global->local_communication_only))
  202.     {
  203.     listener_fd = p4_global->listener_fd;
  204.     listener_info = alloc_listener_info();
  205.     l = listener_info;
  206.     get_pipe(&end_1, &end_2);
  207.     }
  208. #   endif
  209.  
  210. #   ifdef TCMP
  211.     tcmp_init(NULL,p4_get_my_cluster_id(),shmem_getclunid());
  212. #   endif
  213.  
  214. #   if defined(IPSC860)  ||  defined(CM5)  ||  defined(NCUBE)
  215.     for (i = 1; i <= nslaves; i++)
  216.     {
  217.     p4_dprintfl(90,"doing initial sync with local slave %d\n",i);
  218. #       if defined(IPSC860)
  219.     csend((long) SYNC_MSG, &bm_msg, (long) sizeof(struct bm_rm_msg), 
  220.           (long) i, (long) NODE_PID);
  221.     crecv(INITIAL_INFO, &bm_msg, (long) sizeof(struct bm_rm_msg));
  222. #       endif
  223. #       if defined(CM5)
  224.     CMMD_send_noblock(i, SYNC_MSG, &bm_msg, sizeof(struct bm_rm_msg));
  225.         CMMD_receive(CMMD_ANY_NODE, INITIAL_INFO, (void *) &bm_msg, sizeof(struct bm_rm_msg));
  226. #       endif
  227. #       if defined(NCUBE)
  228.     nwrite(&bm_msg, sizeof(struct bm_rm_msg), i, SYNC_MSG, &unused_flag);
  229.         from = NCUBE_ANY_NODE;
  230.         type = INITIAL_INFO;
  231.         nread(&bm_msg, sizeof(struct bm_rm_msg), &from, &type, &unused_flag);
  232. #       endif
  233.     port = p4_n_to_i(bm_msg.port);
  234.     slave_idx = p4_n_to_i(bm_msg.slave_idx);
  235.     slave_pid = p4_n_to_i(bm_msg.slave_pid);
  236.     switch_port = p4_n_to_i(bm_msg.switch_port);
  237.     /* big master installing local slaves */
  238.     install_in_proctable(0, port, slave_pid, bm_msg.host_name, 
  239.                  slave_idx, P4_MACHINE_TYPE, switch_port);
  240.     p4_global->local_slave_count++;
  241.     }
  242. #   else
  243.     for (slave_idx = 1; slave_idx <= nslaves; slave_idx++)
  244.     {
  245.     p4_dprintfl(20, "creating local slave %d of %d\n",slave_idx,nslaves);
  246.     slave_pid = fork_p4();
  247.     if (slave_pid < 0)
  248.         p4_error("create_bm_processes fork", slave_pid);
  249.     else
  250.         if (slave_pid)
  251.         p4_dprintfl(10, "created local slave %d\n", slave_idx);
  252.     if (slave_pid == 0)    /* At this point, we are the slave. */
  253.     {
  254.         sprintf(whoami_p4, "bm_slave_%d_%d", slave_idx, getpid());
  255.  
  256.         p4_free(p4_local);    /* Doesn't work for weird memory model. */
  257.         p4_local = alloc_local_slave();
  258.  
  259. #           ifdef CAN_DO_SOCKET_MSGS
  260.         if (!(p4_global->local_communication_only))
  261.         {
  262.         p4_local->listener_fd = end_1;
  263.         close(end_2);
  264.         close(listener_fd);
  265.         }
  266.         SIGNAL_P4(LISTENER_ATTN_SIGNAL, handle_connection_interrupt);
  267. #           endif
  268.  
  269.         /* hang for a valid proctable */
  270.         p4_lock(&p4_global->slave_lock);
  271.         p4_unlock(&p4_global->slave_lock);
  272.  
  273.         p4_local->my_id = p4_get_my_id_from_proc();
  274.         setup_conntab();
  275.         sprintf(whoami_p4, "p%d_%d", p4_local->my_id, getpid());
  276.         usc_init();
  277.  
  278. #           ifdef TCMP
  279.             tcmp_init(NULL,p4_get_my_cluster_id(),shmem_getclunid());
  280. #           endif
  281.  
  282.         /* 
  283.            sync with master twice: once to make sure all slaves have 
  284.            got proctable, and second after the master has synced with the 
  285.            remote processes 
  286.         */
  287.         p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids());
  288.         p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids());
  289.  
  290.         p4_dprintfl(20, "local slave starting\n");
  291.         ALOG_SETUP(p4_local->my_id,ALOG_TRUNCATE);
  292.         ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");
  293.         return;
  294.     }
  295.  
  296.     /* master installing local slaves */
  297.     install_in_proctable(0, p4_global->listener_port, slave_pid,
  298.                  p4_global->my_host_name, 
  299.                  slave_idx, P4_MACHINE_TYPE,
  300.                  p4_global->proctable[0].switch_port);
  301.     p4_global->local_slave_count++;
  302.     }
  303. #   endif
  304.  
  305. #   if defined(CM5)
  306.     for (i=nslaves+1; i < CMMD_partition_size(); i++)
  307.         CMMD_send_noblock(i, DIE, &bm_msg, sizeof(struct bm_rm_msg));
  308. #   endif
  309. #   if defined(NCUBE)
  310.     for (i=nslaves+1; i < ncubesize(); i++)
  311.     nwrite(&bm_msg, sizeof(struct bm_rm_msg), i, DIE, &unused_flag);
  312. #   endif
  313.  
  314.  
  315.     /* Done creating slaves. Now fork off the listener */
  316.  
  317. #   if !defined(IPSC860)  &&  !defined(CM5)  &&  !defined(NCUBE)
  318.  
  319. #   ifdef CAN_DO_SOCKET_MSGS
  320.     if (!(p4_global->local_communication_only))
  321.     {
  322.     listener_pid = fork_p4();
  323.     if (listener_pid < 0)
  324.         p4_error("create_bm_processes listener fork", listener_pid);
  325.     if (listener_pid == 0)
  326.     {
  327.         sprintf(whoami_p4, "bm_list_%d", getpid());
  328.         /* Inside listener */
  329.         p4_local = alloc_local_listener();
  330.         l->listening_fd = listener_fd;
  331.         l->slave_fd = end_2;
  332.         close(end_1);
  333.         {
  334.         /* exec external listener process */
  335.  
  336.         char *listener_prg = LISTENER_PATHNAME;
  337.  
  338.         if (*listener_prg)
  339.         {
  340.             char dbg_c[10], max_c[10], lfd_c[10], sfd_c[10];
  341.  
  342.             sprintf(dbg_c, "%d", debug_level);
  343.             sprintf(max_c, "%d", p4_global->max_connections);
  344.             sprintf(lfd_c, "%d", l->listening_fd);
  345.             sprintf(sfd_c, "%d", l->slave_fd);
  346.             p4_dprintfl(70, "exec %s %s %s %s %s\n",
  347.                 listener_prg, dbg_c, max_c, lfd_c, sfd_c);
  348.             execlp(listener_prg, listener_prg,
  349.                dbg_c, max_c, lfd_c, sfd_c, NULL);
  350.             p4_dprintfl(70, "exec failed (errno= %d), using buildin\n",
  351.                 errno);
  352.         }
  353.         }
  354.         listener();
  355.         exit(0);
  356.     }
  357.     }
  358. #   endif
  359.  
  360.     /* Else we're still in the big master */
  361.     sprintf(whoami_p4, "p0_%d", getpid());
  362.  
  363.     /* We need to close the fds from the listener setup */
  364. #   ifdef CAN_DO_SOCKET_MSGS
  365.     if (!(p4_global->local_communication_only))
  366.     {
  367.     p4_local->listener_fd = end_1;
  368.     close(listener_fd);
  369.     close(end_2);
  370.     p4_global->listener_pid = listener_pid;
  371.     }
  372. #   endif
  373.  
  374. #   endif
  375.  
  376.     dump_global(80);
  377.     p4_dprintfl(90, "create_bm_processes: exiting\n");
  378.     return (nslaves);
  379. }
  380.  
  381.  
  382. P4VOID procgroup_to_proctable(pg)
  383. struct p4_procgroup *pg;
  384. {
  385.     int i, j, ptidx;
  386.     struct p4_procgroup_entry *pe;
  387.  
  388.     if (strcmp(pg->entries[0].host_name,"local") != 0)
  389.     p4_error("local is not first entry in procgroup ",0);
  390.     strcpy(p4_global->proctable[0].host_name,p4_global->my_host_name);
  391.     get_qualified_hostname(p4_global->proctable[0].host_name);
  392.     p4_global->proctable[0].group_id = 0;
  393.     ptidx = 1;
  394.     for (i=0, pe=pg->entries; i < pg->num_entries; i++, pe++)
  395.     {
  396.     for (j=0; j < pe->numslaves_in_group; j++)
  397.     {
  398.         if (i == 0)
  399.         strcpy(p4_global->proctable[ptidx].host_name,p4_global->my_host_name);
  400.         else
  401.         strcpy(p4_global->proctable[ptidx].host_name,pe->host_name);
  402.         get_qualified_hostname(p4_global->proctable[ptidx].host_name);
  403.         p4_global->proctable[ptidx].group_id = i;
  404.         ptidx++;
  405.     }
  406.     p4_global->num_in_proctable = ptidx;
  407.     }
  408. }
  409.  
  410. P4VOID sync_with_remotes()
  411. {
  412.     struct bm_rm_msg msg;
  413.     int i, fd, node, num_rms, rm[P4_MAXPROCS];
  414.  
  415.     p4_dprintfl(90, "sync_with_remotes: starting\n");
  416.  
  417. #   ifdef CAN_DO_SOCKET_MSGS
  418.     p4_get_cluster_masters(&num_rms, rm);
  419.     for (i = 1; i < num_rms; i++)
  420.     {
  421.     node = rm[i];
  422.     fd = p4_local->conntab[node].port;
  423.     net_recv(fd, &msg, sizeof(msg));
  424.     msg.type = p4_n_to_i(msg.type);
  425.     if (msg.type != SYNC_MSG)
  426.         p4_error("sync_with_remotes: bad type rcvd\n",msg.type);
  427.     }
  428.     for (i = 1; i < num_rms; i++)
  429.     {
  430.     node = rm[i];
  431.     fd = p4_local->conntab[node].port;
  432.     msg.type = p4_i_to_n(SYNC_MSG);
  433.     net_send(fd, &msg, sizeof(msg), FALSE);
  434.     }
  435. #   endif
  436. }
  437.  
  438. P4VOID send_proc_table()
  439. {
  440.     int slave_idx, ent;
  441.     int fd;
  442.     struct bm_rm_msg msg;
  443.     struct proc_info *pe;
  444.  
  445.     p4_dprintfl(90, "send_proc_table: starting\n");
  446.  
  447. #   ifdef CAN_DO_SOCKET_MSGS
  448.     for (slave_idx = 1; slave_idx < p4_global->num_in_proctable; slave_idx++)
  449.     {
  450.     if (p4_global->proctable[slave_idx].slave_idx != 0)
  451.         continue;
  452.  
  453.     fd = p4_local->conntab[slave_idx].port;
  454.  
  455.     p4_dprintfl(90, "sending proctable to slave %d on %d:\n", slave_idx, fd);
  456.     if (fd < 0)
  457.         p4_error("send_proc_table: rm entry doesn't have valid fd", fd);
  458.  
  459.     for (ent = 0, pe = p4_global->proctable;
  460.          ent < p4_global->num_in_proctable; ent++, pe++)
  461.     {
  462.         msg.type = p4_i_to_n(PROC_TABLE_ENTRY);
  463.         msg.port = p4_i_to_n(pe->port);
  464.         msg.unix_id = p4_i_to_n(pe->unix_id);
  465.         msg.slave_idx = p4_i_to_n(pe->slave_idx);
  466.         msg.group_id = p4_i_to_n(pe->group_id);
  467.         strcpy(msg.host_name, pe->host_name);
  468.         strcpy(msg.machine_type,pe->machine_type);
  469.         msg.switch_port = p4_i_to_n(pe->switch_port);
  470.         net_send(fd, &msg, sizeof(msg), FALSE);
  471.     }
  472.     p4_dprintfl(90, "  sending end_of_proc_table\n");
  473.     msg.type = p4_i_to_n(PROC_TABLE_END);
  474.     net_send(fd, &msg, sizeof(msg), FALSE);
  475.     }
  476. #   endif
  477. }
  478.